项目介绍 该项目是一款基于 Netty 框架开发的高性能、轻量级即时通讯(IM)系统。它采用 Protobuf 作为序列化协议,旨在提供一个稳定、可扩展的双工通信架构,可以作为即时通讯的基础底座。
主要功能
实时双工通信:基于 TCP 长连接,实现毫秒级的消息收发延迟。
身份认证系统:支持用户登录/注销流程,确保每个连接的合法性。
私聊路由 (P2P):服务端根据 UID 自动寻址,精准将消息投递至目标 Channel。
心跳与链路保活:内置心跳检测机制,能够自动剔除死链接(僵尸连接)。
异常自愈:客户端具备完善的断线监听与自动重连逻辑。
控制台交互:提供人性化的命令行菜单,支持指令式操作。
组件架构 以下是 IM 单体项目的核心架构和组件全景图:
Client (客户端)
UI/Console 层
CommandController: 流程调度中心,管理 connectLock 同步锁
ConsoleCommand: 指令解析系统 (Login/Chat/Logout)
ClientSession: 本地会话管理,绑定 UID 与 Channel
Sender 业务层 (Outbound)
职责:封装 Protobuf 协议包并执行发送
LoginSender
ChatSender
LogoutSender
Netty Pipeline (Inbound)
Decoder: 字节流转 Protobuf POJO
LoginResponseHandler: 登录成功即自销毁
-- 登录成功后动态注入逻辑 --
HeartBeatClientHandler: 周期性探活
ChatMsgHandler: 实时消息接收
ExceptionHandler: 异常捕获与重连引导
TCP / Protobuf
[1] 身份认证握手
[2] 心跳维持(30s)
[3] P2P 消息路由
[4] 优雅断开连接
Server (服务端)
SessionMap 寻址中心
全局 ConcurrentHashMap:记录 UID ⇄ Channel。通过该 Map 实现跨通道消息精准转发。
Netty Pipeline (Handler 链)
Encoder: 对象序列化为字节流
LoginRequestHandler: 权限与 Token 校验
HeartBeatServerHandler: 读超时自动断开
ChatRedirectHandler: 核心路由与转发
Business Processor (异步处理)
解耦 IO 线程,处理耗时业务
LoginProcessor
LogoutProcessor
IM系统架构与组件协作总结:
连接同步: CommandController 利用 wait/notify 机制,使控制台线程在Netty异步连接结果返回前保持阻塞。
状态管理: 客户端登录成功后通过 pipeline.remove/addAfter 实现处理器动态热插拔。
消息转发: 服务端拦截ChatRequest,通过SessionMap检索目标UID关联的Channel并执行 writeAndFlush。
客户端:
CommandController: 负责同步用户输入与网络层异步回调。
Sender 族 (Login/Chat/Logout): 负责将业务 POJO 封装为符合协议规范的字节流。
Pipeline 处理器:
LoginResponseHandler: 状态转换守卫,登录成功后执行 Pipeline 热插拔。
ChatMsgHandler: 监听入站消息并渲染到控制台。
服务端:
SessionMap: 全局并发哈希表,维护 UID ⇄ Channel 映射,是消息分发的心脏。
RedirectHandler: 消息路由中心,负责解析目标地址并完成跨通道路由。
Business Processors: 独立于 IO 线程的业务执行单元,处理耗时逻辑(如登录校验)。
客户端原理和流程
其他说明 环境依赖 :JDK 17+、Netty 4.x、Protobuf 3.x
启动说明 :
Step 1: 启动服务端 ChatNettyServer,默认监听指定端口。
Step 2: 启动客户端 ChatNettyClient。
Step 3: 进入控制台,根据菜单指令进行操作:
输入 1 进行登录。
输入 2 发起聊天(需指定对方 UID)。
输入 10 退出登录并断开连接。
单体IM系统源码 父模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.demo</groupId > <artifactId > im-solo</artifactId > <version > 1.0-SNAPSHOT</version > <packaging > pom</packaging > <properties > <java.version > 17</java.version > <maven.compiler.source > 17</maven.compiler.source > <maven.compiler.target > 17</maven.compiler.target > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <protobuf.version > 4.28.2</protobuf.version > <netty.version > 4.2.12.Final</netty.version > <guava.version > 33.4.0-jre</guava.version > <spring-boot.version > 3.5.13</spring-boot.version > <logback.version > 1.5.25</logback.version > <lombok.version > 1.18.44</lombok.version > </properties > <modules > <module > im-client</module > <module > im-server</module > <module > im-common</module > </modules > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > ${spring-boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-lang3</artifactId > <version > 3.18.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.18.0</version > </dependency > </dependencies > </dependencyManagement > </project >
common 模块 依赖配置 重点是需要引入 protobuf 需要的依赖和插件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > com.demo</groupId > <artifactId > im-solo</artifactId > <version > 1.0-SNAPSHOT</version > </parent > <artifactId > im-common</artifactId > <dependencies > <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > ${netty.version}</version > </dependency > <dependency > <groupId > com.google.protobuf</groupId > <artifactId > protobuf-java</artifactId > <version > ${protobuf.version}</version > </dependency > <dependency > <groupId > com.google.guava</groupId > <artifactId > guava</artifactId > <version > ${guava.version}</version > </dependency > <dependency > <groupId > com.google.code.gson</groupId > <artifactId > gson</artifactId > <version > 2.10</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 2.0.60</version > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-lang3</artifactId > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > </dependency > <dependency > <groupId > org.junit.jupiter</groupId > <artifactId > junit-jupiter</artifactId > <version > 5.12.2</version > <scope > test</scope > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-classic</artifactId > <version > ${logback.version}</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-core</artifactId > <version > ${logback.version}</version > </dependency > </dependencies > <build > <extensions > <extension > <groupId > kr.motd.maven</groupId > <artifactId > os-maven-plugin</artifactId > <version > 1.7.1</version > </extension > </extensions > <plugins > <plugin > <groupId > org.xolstice.maven.plugins</groupId > <artifactId > protobuf-maven-plugin</artifactId > <version > 0.6.1</version > <configuration > <protoSourceRoot > src/main/proto</protoSourceRoot > <protocArtifact > com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} </protocArtifact > </configuration > <executions > <execution > <goals > <goal > compile</goal > <goal > test-compile</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
定义数据协议 src/main/proto/config/msg.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 syntax = "proto3" ; package com.owlias.im.common.core.bean.msg;enum HeadType { LOGIN_REQUEST = 0 ; LOGIN_RESPONSE = 1 ; LOGOUT_REQUEST = 2 ; LOGOUT_RESPONSE = 3 ; HEART_BEAT = 4 ; MESSAGE_REQUEST = 5 ; MESSAGE_RESPONSE = 6 ; MESSAGE_NOTIFICATION = 7 ; } message LoginRequest { string uid = 1 ; string deviceId = 2 ; string token = 3 ; uint32 platform = 4 ; string appVersion = 5 ; } message LoginResponse { bool result = 1 ; uint32 code = 2 ; string info = 3 ; uint32 expose = 4 ; } message MessageRequest { uint64 msgId = 1 ; string from = 2 ; string to = 3 ; uint64 time = 4 ; uint32 msgType = 5 ; string content = 6 ; string url = 8 ; string property = 9 ; string fromNick = 10 ; string json = 11 ; } message MessageResponse { bool result = 1 ; uint32 code = 2 ; string info = 3 ; uint32 expose = 4 ; } message MessageNotification { uint64 noId = 1 ; string json = 2 ; string timestamp = 3 ; } message MessageHeartBeat { uint32 seq = 1 ; string uid = 2 ; string json = 3 ; } message Message { HeadType type = 1 ; uint64 sequence = 2 ; string sessionId = 3 ; oneof body { LoginRequest loginRequest = 4 ; LoginResponse loginResponse = 5 ; MessageRequest messageRequest = 6 ; MessageResponse messageResponse = 7 ; MessageNotification notification = 8 ; MessageHeartBeat heartBeat = 9 ; } }
并发相关的设计 CallbackTask
1 2 3 4 5 public interface CallbackTask <R> { R execute () throws Exception; void onBack (R r) ; void onException (Throwable t) ; }
ExecuteTask
1 2 3 public interface ExecuteTask { void execute () ; }
FutureTaskScheduler
1 2 3 4 5 6 7 8 9 10 11 12 13 import com.owlias.im.common.utils.ThreadUtil;import java.util.concurrent.ThreadPoolExecutor;public class FutureTaskScheduler { static final ThreadPoolExecutor mixPool; static { mixPool = ThreadUtil.getMixedTargetThreadPool(); } public static void add (ExecuteTask executeTask) { mixPool.submit(executeTask::execute); } }
CallbackTaskScheduler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import com.google.common.util.concurrent.*;import com.owlias.im.common.utils.ThreadUtil;import java.util.concurrent.ExecutorService;public class CallbackTaskScheduler { static ListeningExecutorService guavaPool; static { ExecutorService jPool = ThreadUtil.getMixedThreadPoolInstance(); guavaPool = MoreExecutors.listeningDecorator(jPool); } public static <R> void add (CallbackTask<R> executeTask) { ListenableFuture<R> future = guavaPool.submit(executeTask::execute); Futures.addCallback(future, new FutureCallback <>() { public void onSuccess (R r) { executeTask.onBack(r); } public void onFailure (Throwable t) { executeTask.onException(t); } }, guavaPool); } }
ThreadUtils,请参考 《Java 工具类之线程相关》 。
POJO类 ChatMsg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import com.owlias.im.common.core.bean.msg.Msg;import lombok.Data;import org.apache.commons.lang3.StringUtils;@Data public class ChatMsg { private User user; private long msgId; private String from; private String to; private long time; private MSG_TYPE msgType; private String content; private String url; private String property; private String fromNick; private String json; public enum MSG_TYPE { TEXT, AUDIO, VIDEO, POS, OTHER; } public ChatMsg (User user) { if (null == user) { return ; } this .user = user; this .setTime(System.currentTimeMillis()); this .setFrom(user.getUid()); this .setFromNick(user.getNickName()); } public void fillMsg (Msg.MessageRequest.Builder cb) { if (msgId > 0 ) { cb.setMsgId(msgId); } if (StringUtils.isNotEmpty(from)) { cb.setFrom(from); } if (StringUtils.isNotEmpty(to)) { cb.setTo(to); } if (time > 0 ) { cb.setTime(time); } if (msgType != null ) { cb.setMsgType(msgType.ordinal()); } if (StringUtils.isNotEmpty(content)) { cb.setContent(content); } if (StringUtils.isNotEmpty(url)) { cb.setUrl(url); } if (StringUtils.isNotEmpty(property)) { cb.setProperty(property); } if (StringUtils.isNotEmpty(fromNick)) { cb.setFromNick(fromNick); } if (StringUtils.isNotEmpty(json)) { cb.setJson(json); } } }
User
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j @Data public class User { private static final AtomicInteger NO = new AtomicInteger (1 ); public enum PLAT_TYPE { WINDOWS, MAC, ANDROID, IOS, WEB, OTHER; } String uid = String.valueOf(NO.getAndIncrement()); String devId = UUID.randomUUID().toString(); String token = UUID.randomUUID().toString(); String nickName = "nickName" ; PLAT_TYPE platform = PLAT_TYPE.MAC; private String sessionId; public void setPlatform (int platform) { PLAT_TYPE[] values = PLAT_TYPE.values(); this .platform = (platform >= 0 && platform < values.length) ? values[platform] : PLAT_TYPE.OTHER; } @Override public String toString () { return "User{" + "uid='" + uid + '\'' + ", devId='" + devId + '\'' + ", token='" + token + '\'' + ", nickName='" + nickName + '\'' + ", platform=" + platform + '}' ; } public static User fromLoginMsg (Msg.LoginRequest info) { User user = new User (); user.uid = info.getUid(); user.devId = info.getDeviceId(); user.token = info.getToken(); user.setPlatform(info.getPlatform()); log.info("登录中: {}" , user); return user; } }
解码和编码器 SimpleProtobufDecoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import com.google.protobuf.InvalidProtocolBufferException;import com.owlias.im.common.core.ProtoInstant;import com.owlias.im.common.core.bean.msg.Msg;import com.owlias.im.common.core.ex.InvalidFrameException;import com.owlias.im.common.utils.Logger;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import lombok.extern.slf4j.Slf4j;import java.util.List;@Slf4j public class SimpleProtobufDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object outMsg = decode0(ctx, in); if (outMsg != null ) { out.add(outMsg); } } public static Object decode0 (ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException { in.markReaderIndex(); if (in.readableBytes() < 8 ) { return null ; } short magic = in.readShort(); if (magic != ProtoInstant.MAGIC_CODE) { String error = "客户端口令不对: " + ctx.channel().remoteAddress(); throw new InvalidFrameException (error); } short version = in.readShort(); if (version != ProtoInstant.VERSION_CODE) { String error = "协议的版本不对: " + ctx.channel().remoteAddress(); throw new InvalidFrameException (error); } int length = in.readInt(); if (length < 0 ) { ctx.close(); } if (in.readableBytes() < length) { in.resetReaderIndex(); return null ; } Logger.cfo("decoder length=" + in.readableBytes()); byte [] array = new byte [length]; in.readBytes(array); return Msg.Message.parseFrom(array); } }
SimpleProtobufEncoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class SimpleProtobufEncoder extends MessageToByteEncoder <Msg.Message> { @Override protected void encode (ChannelHandlerContext ctx, Msg.Message msg, ByteBuf out) throws Exception { encode0(msg, out); } public static void encode0 (Msg.Message msg, ByteBuf out) { out.writeShort(ProtoInstant.MAGIC_CODE); out.writeShort(ProtoInstant.VERSION_CODE); byte [] bytes = msg.toByteArray(); int length = bytes.length; Logger.cfo("encoder length=" + length); out.writeInt(length); out.writeBytes(bytes); } }
异常类定义 InvalidFrameException
1 2 3 4 5 public class InvalidFrameException extends Exception { public InvalidFrameException (String s) { super (s); } }
常数定义 ProtoInstant
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class ProtoInstant { public static final short MAGIC_CODE = 0x86 ; public static final short VERSION_CODE = 0x01 ; public interface Platform { public static final int WINDOWS = 1 ; public static final int MAC = 2 ; public static final int ANDROID = 3 ; public static final int IOS = 4 ; public static final int WEB = 5 ; public static final int UNKNOWN = 6 ; } public enum ResultCodeEnum { SUCCESS(0 , "Success" ), AUTH_FAILED(1 , "登录失败" ), NO_TOKEN(2 , "没有授权码" ), UNKNOWN_ERROR(3 , "未知错误" ), ; private Integer code; private String desc; ResultCodeEnum(Integer code, String desc) { this .code = code; this .desc = desc; } public Integer getCode () { return code; } public String getDesc () { return desc; } } }
server 模块 依赖配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > com.demo</groupId > <artifactId > im-solo</artifactId > <version > 1.0-SNAPSHOT</version > </parent > <artifactId > im-server</artifactId > <dependencies > <dependency > <groupId > com.demo</groupId > <artifactId > im-common</artifactId > <version > ${project.version}</version > </dependency > <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > ${netty.version}</version > </dependency > <dependency > <groupId > com.google.protobuf</groupId > <artifactId > protobuf-java</artifactId > <version > ${protobuf.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-devtools</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-classic</artifactId > <version > ${logback.version}</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-core</artifactId > <version > ${logback.version}</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </dependency > <dependency > <groupId > com.google.guava</groupId > <artifactId > guava</artifactId > <version > ${guava.version}</version > </dependency > </dependencies > <build > <finalName > im-server</finalName > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.11.0</version > <configuration > <source > ${maven.compiler.source}</source > <target > ${maven.compiler.target}</target > <annotationProcessorPaths > <path > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </path > </annotationProcessorPaths > <compilerArgs > <arg > -parameters</arg > </compilerArgs > <parameters > true</parameters > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > ${spring-boot.version}</version > <executions > <execution > <goals > <goal > repackage</goal > </goals > </execution > </executions > <configuration > <excludeDevtools > true</excludeDevtools > <mainClass > com.owlias.im.server.ServerApp</mainClass > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.6.0</version > <configuration > <descriptors > <descriptor > src/main/assembly/assembly.xml</descriptor > </descriptors > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
启动相关类 采用了 Spring Boot + Netty 的深度集成方案。利用 Spring 管理业务逻辑(如 LoginRequestHandler),利用 Netty 处理极致的网络 I/O,两者各司其职。在 ServerApp 中,采用非常稳健的启动顺序:
Spring 优先 :先启动 Spring 容器,完成所有 Bean(依赖注入、配置读取)的初始化。
Netty 紧随其后 :从容器中取出 ChatNettyServer 并运行。
保证当 Netty 开始接收连接时,所有的业务处理器(Handler)已经全部就绪,不会出现空指针异常。
ServerApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;@SpringBootApplication public class ServerApp { public static void main (String[] args) { ApplicationContext context = SpringApplication.run(ServerApp.class, args); ChatNettyServer nettyServer = context.getBean(ChatNettyServer.class); nettyServer.run(); } }
ChatNettyServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Slf4j @Service public class ChatNettyServer { @Value("${server.port:8081}") private int port; private EventLoopGroup bg; private EventLoopGroup wg; @Resource private LoginRequestHandler loginRequestHandler; @Resource private ServerExceptionHandler serverExceptionHandler; public void run () { bg = new NioEventLoopGroup (1 ); wg = new NioEventLoopGroup (); try { ServerBootstrap b = new ServerBootstrap (); b.group(bg, wg) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (port)) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_KEEPALIVE, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast("decoder" , new SimpleProtobufDecoder ()); ch.pipeline().addLast("encoder" , new SimpleProtobufEncoder ()); ch.pipeline().addLast("login" , loginRequestHandler); ch.pipeline().addLast("exception" , serverExceptionHandler); } }); ChannelFuture channelFuture = b.bind().sync(); log.info("IM-Server 启动成功, 监听地址: {}" , channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { log.error("IM-Server 启动失败" , e); } finally { close(); } } @PreDestroy public void close () { if (wg != null ) wg.shutdownGracefully(); if (bg != null ) bg.shutdownGracefully(); log.info("IM-Server 已优雅关闭并释放线程资源" ); } }
session 管理 SessionMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Slf4j @Data public class SessionMap { private SessionMap () {} private static SessionMap singleInstance = new SessionMap (); public static SessionMap inst () { return singleInstance; } private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap <>(); public void addSession (ServerSession s) { map.put(s.getSessionId(), s); log.info("用户登录:id= {} 在线总数: {}" , s.getUser().getUid(), map.size()); } public ServerSession getSession (String sessionId) { return map.getOrDefault(sessionId, null ); } public List<ServerSession> getSessionsBy (String userId) { return map.values() .stream() .filter(s -> s.getUser().getUid().equals(userId)) .collect(Collectors.toList()); } public void removeSession (String sessionId) { if (!map.containsKey(sessionId)) { return ; } ServerSession s = map.get(sessionId); map.remove(sessionId); Logger.tcfo("用户下线:id= " + s.getUser().getUid() + " 在线总数: " + map.size()); } public boolean hasLogin (User user) { for (Map.Entry<String, ServerSession> next : map.entrySet()) { User u = next.getValue().getUser(); if (u.getUid().equals(user.getUid()) && u.getPlatform().equals(user.getPlatform())) { return true ; } } return false ; } }
ServerSession
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 package com.owlias.im.server.session;import com.owlias.im.common.core.bean.User;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.util.AttributeKey;import lombok.Data;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import java.util.UUID;@Data @Slf4j public class ServerSession { public static final AttributeKey<ServerSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY" ); private Channel channel; private User user; @Getter private final String sessionId; private boolean isLogin = false ; public ServerSession (Channel channel) { this .channel = channel; this .sessionId = buildNewSessionId(); } public static ServerSession getSession (ChannelHandlerContext ctx) { Channel channel = ctx.channel(); return channel.attr(ServerSession.SESSION_KEY).get(); } public static void closeSession (ChannelHandlerContext ctx) { ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get(); if (null != session && session.isValid()) { session.close(); SessionMap.inst().removeSession(session.getSessionId()); } } public void reverseBind () { channel.attr(ServerSession.SESSION_KEY).set(this ); log.info("完成 channel 绑定 session {}" , channel.remoteAddress()); SessionMap.inst().addSession(this ); isLogin = true ; } public void unbind () { isLogin = false ; SessionMap.inst().removeSession(getSessionId()); this .close(); } private static String buildNewSessionId () { String uuid = UUID.randomUUID().toString(); return uuid.replaceAll("-" , "" ); } public boolean isValid () { return getUser() != null ; } public synchronized void writeAndFlush (Object pkg) { if (channel.isActive()) { channel.writeAndFlush(pkg).addListener(future -> { if (!future.isSuccess()) { log.error("回写消息失败" , future.cause()); } }); } } public synchronized void close () { ChannelFuture future = channel.close(); future.addListener((ChannelFutureListener) f -> { if (!f.isSuccess()) { log.error("CHANNEL_CLOSED error " ); } }); } public void setUser (User user) { this .user = user; user.setSessionId(sessionId); } }
入站处理链条相关
动态流水线
阶段 A:鉴权态
Decoder
LoginResponseH.
ExceptionH.
➜
Login Success
Dynamic Update
➜
阶段 B:业务态
Decoder
HeartBeatH.
ChatMsgH.
ExceptionH.
SimpleProtobufDecoder -> LoginRequestHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import com.owlias.im.common.concurrent.CallbackTask;import com.owlias.im.common.concurrent.CallbackTaskScheduler;import com.owlias.im.common.core.bean.msg.Msg;import com.owlias.im.server.processor.LoginProcessor;import com.owlias.im.server.session.ServerSession;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j @Service @ChannelHandler .Sharable public class LoginRequestHandler extends ChannelInboundHandlerAdapter { @Resource private LoginProcessor loginProcessor; @Resource private ChatRedirectHandler chatRedirectHandler; @Resource private LogoutRequestHandler logoutRequestHandler; @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.info("收到一个新的连接,但是没有登录 {}" , ctx.channel().id()); super .channelActive(ctx); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg)) { super .channelRead(ctx, msg); return ; } Msg.HeadType headType = pkg.getType(); if (!headType.equals(loginProcessor.type())) { super .channelRead(ctx, msg); return ; } ServerSession session = new ServerSession (ctx.channel()); CallbackTaskScheduler.add(new CallbackTask <Boolean>() { @Override public Boolean execute () throws Exception { return loginProcessor.action(session, pkg); } @Override public void onBack (Boolean r) { if (r) { if (!ctx.channel().isActive()) return ; String currentName = ctx.name(); ctx.pipeline().addAfter(currentName, "heartBeat" , new HeartBeatServerHandler ()); ctx.pipeline().addAfter("heartBeat" , "logout" , logoutRequestHandler); ctx.pipeline().addAfter("logout" , "chat" , chatRedirectHandler); ctx.pipeline().remove(LoginRequestHandler.this ); log.info("登录成功: {}" , session.getUser()); } else { ServerSession.closeSession(ctx); log.info("登录失败: {}" , session.getUser()); } } @Override public void onException (Throwable t) { ServerSession.closeSession(ctx); log.info("登录异常: {}" , session.getUser()); } }); } }
HeartBeatServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j public class HeartBeatServerHandler extends IdleStateHandler { private static final int READ_IDLE_GAP = 150 ; public HeartBeatServerHandler () { super (READ_IDLE_GAP, 0 , 0 , TimeUnit.SECONDS); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg)) { super .channelRead(ctx, msg); return ; } Msg.HeadType headType = pkg.getType(); if (headType.equals(Msg.HeadType.HEART_BEAT)) { FutureTaskScheduler.add(() -> { if (ctx.channel().isActive()) { ctx.writeAndFlush(msg); } }); } super .channelRead(ctx, msg); } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent event) { if (event.state() == IdleState.READER_IDLE) { log.warn("心跳读取超时,强制关闭连接: {}" , ctx.channel().id()); ServerSession.closeSession(ctx); } } } }
LogoutRequestHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Service @ChannelHandler .Sharablepublic class LogoutRequestHandler extends ChannelInboundHandlerAdapter { @Resource private LogoutProcessor logoutProcessor; @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg) || !pkg.getType().equals(logoutProcessor.type())) { super .channelRead(ctx, msg); return ; } ServerSession session = ServerSession.getSession(ctx); FutureTaskScheduler.add(() -> logoutProcessor.action(session, pkg)); } }
ChatRedirectHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Slf4j @Service @ChannelHandler .Sharablepublic class ChatRedirectHandler extends ChannelInboundHandlerAdapter { @Resource ChatRedirectProcessor chatRedirectProcessor; public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (null == msg || !(msg instanceof Msg.Message)) { super .channelRead(ctx, msg); return ; } Msg.Message pkg = (Msg.Message) msg; Msg.HeadType headType = ((Msg.Message) msg).getType(); if (!headType.equals(chatRedirectProcessor.type())) { super .channelRead(ctx, msg); return ; } ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get(); if (null == session || !session.isLogin()) { log.error("用户尚未登录,不能发送消息" ); return ; } FutureTaskScheduler.add(() -> chatRedirectProcessor.action(session, pkg)); } }
ServerExceptionHandler:作为整个 Netty 流水线的最后一道防线,ServerExceptionHandler 扮演着后勤部和战场清理员的角色。它的存在确保了系统即使在面对网络波动、恶意攻击或意料之外的代码错误时,依然能保持资源不泄露和状态一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j @ChannelHandler .Sharable@Service public class ServerExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof InvalidFrameException) { log.error("发生未捕获异常: " , cause); } if (cause instanceof IOException) { log.error(cause.getMessage()); log.error("客户端已经关闭连接,这里需要做下线处理" ); ServerSession.closeSession(ctx); } else { log.error("发生未捕获异常: " , cause); } } public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { ServerSession.closeSession(ctx); } }
线程 Processor ServerProcessor 接口:
1 2 3 4 public interface ServerProcessor { Msg.HeadType type () ; boolean action (ServerSession ch, Msg.Message proto) ; }
LoginProcessor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import com.owlias.im.common.core.ProtoInstant;import com.owlias.im.common.core.bean.User;import com.owlias.im.common.core.bean.msg.Msg;import com.owlias.im.server.converter.LoginResponseConverter;import com.owlias.im.server.session.ServerSession;import com.owlias.im.server.session.SessionMap;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import java.util.List;@Slf4j @Service public class LoginProcessor implements ServerProcessor { @Resource LoginResponseConverter loginResponseConverter; public Msg.HeadType type () { return Msg.HeadType.LOGIN_REQUEST; } @Override public boolean action (ServerSession session, Msg.Message proto) { Msg.LoginRequest info = proto.getLoginRequest(); long seqNo = proto.getSequence(); User user = User.fromLoginMsg(info); boolean isValidUser = checkUser(user); if (!isValidUser) { ProtoInstant.ResultCodeEnum resultCode = ProtoInstant.ResultCodeEnum.NO_TOKEN; Msg.Message response = loginResponseConverter.build(resultCode, seqNo, "-1" ); session.writeAndFlush(response); return false ; } session.setUser(user); session.reverseBind(); ProtoInstant.ResultCodeEnum resultCode = ProtoInstant.ResultCodeEnum.SUCCESS; Msg.Message response = loginResponseConverter.build(resultCode, seqNo, session.getSessionId()); session.writeAndFlush(response); return true ; } private boolean checkUser (User user) { List<ServerSession> oldSessions = SessionMap.inst().getSessionsBy(user.getUid()); if (oldSessions != null && !oldSessions.isEmpty()) { log.warn("用户 {} 重复登录,正在清理旧连接" , user.getUid()); oldSessions.forEach(ServerSession::close); } return true ; } }
LoginResponseConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service public class LoginResponseConverter { public Msg.Message build (ProtoInstant.ResultCodeEnum en, long seqId, String sessionId) { Msg.Message.Builder outer = Msg.Message.newBuilder() .setType(Msg.HeadType.LOGIN_RESPONSE) .setSequence(seqId) .setSessionId(sessionId); Msg.LoginResponse.Builder b = Msg.LoginResponse.newBuilder() .setCode(en.getCode()) .setInfo(en.getDesc()) .setExpose(1 ); outer.setLoginResponse(b.build()); return outer.build(); } }
LogoutProcessor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @Service public class LogoutProcessor implements ServerProcessor { @Override public Msg.HeadType type () { return Msg.HeadType.LOGOUT_REQUEST; } @Override public boolean action (ServerSession session, Msg.Message proto) { if (session != null ) { log.info("用户下线请求: {}" , session.getUser()); session.unbind(); session.close(); } return true ; } }
ChatRedirectProcessor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j @Service public class ChatRedirectProcessor implements ServerProcessor { @Override public Msg.HeadType type () { return Msg.HeadType.MESSAGE_REQUEST; } @Override public boolean action (ServerSession ch, Msg.Message proto) { Msg.MessageRequest msg = proto.getMessageRequest(); Logger.tcfo("chatMsg | from=" + msg.getFrom() + " , to=" + msg.getTo() + " , content=" + msg.getContent()); String to = msg.getTo(); List<ServerSession> toSessions = SessionMap.inst().getSessionsBy(to); if (toSessions == null ) { Logger.tcfo("[" + to + "] 不在线,发送失败!" ); } else { toSessions.forEach((session) -> { session.writeAndFlush(proto); }); } return true ; } }
client 模块 依赖配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > com.demo</groupId > <artifactId > im-solo</artifactId > <version > 1.0-SNAPSHOT</version > </parent > <artifactId > im-client</artifactId > <dependencies > <dependency > <groupId > com.demo</groupId > <artifactId > im-common</artifactId > <version > ${project.version}</version > </dependency > <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > ${netty.version}</version > </dependency > <dependency > <groupId > com.google.protobuf</groupId > <artifactId > protobuf-java</artifactId > <version > ${protobuf.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-devtools</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-classic</artifactId > <version > ${logback.version}</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-core</artifactId > <version > ${logback.version}</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </dependency > <dependency > <groupId > com.google.guava</groupId > <artifactId > guava</artifactId > <version > ${guava.version}</version > </dependency > </dependencies > <build > <finalName > im-client</finalName > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.11.0</version > <configuration > <source > ${maven.compiler.source}</source > <target > ${maven.compiler.target}</target > <annotationProcessorPaths > <path > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </path > </annotationProcessorPaths > <compilerArgs > <arg > -parameters</arg > </compilerArgs > <parameters > true</parameters > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > ${spring-boot.version}</version > <executions > <execution > <goals > <goal > repackage</goal > </goals > </execution > </executions > <configuration > <excludeDevtools > true</excludeDevtools > <mainClass > com.owlias.im.client.ClientApp</mainClass > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.6.0</version > <configuration > <descriptors > <descriptor > src/main/assembly/assembly.xml</descriptor > </descriptors > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
启动相关 ClientApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j @SpringBootApplication public class ClientApp { public static void main (String[] args) { ApplicationContext context = SpringApplication.run(ClientApp.class, args); CommandController commandClient = context.getBean(CommandController.class); commandClient.initCommandMap(); try { commandClient.commandThreadRunning(); } catch (InterruptedException e) { log.error("commandThreadRunning error" , e); } } }
CommandController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 import com.owlias.im.client.commond.*;import com.owlias.im.client.sender.ChatSender;import com.owlias.im.client.sender.LoginSender;import com.owlias.im.client.sender.LogoutSender;import com.owlias.im.client.session.ClientSession;import com.owlias.im.common.core.bean.User;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.util.concurrent.GenericFutureListener;import jakarta.annotation.Resource;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Scanner;@Slf4j @Data @Service public class CommandController { @Resource private ChatNettyClient chatNettyClient; private Map<String, BaseCommand> commandMap; @Resource ClientCommandMenu clientCommandMenu; @Resource ChatConsoleCommand chatConsoleCommand; @Resource LoginConsoleCommand loginConsoleCommand; @Resource LogoutConsoleCommand logoutConsoleCommand; @Resource private ChatSender chatSender; @Resource private LoginSender loginSender; @Resource private LogoutSender logoutSender; private final Object connectLock = new Object (); private boolean connectFlag = false ; private User user; private ClientSession session; private Channel channel; public boolean isLogin () { return null != session && session.isLogin(); } public void initCommandMap () { commandMap = new HashMap <>(); commandMap.put(clientCommandMenu.getKey(), clientCommandMenu); commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand); commandMap.put(chatConsoleCommand.getKey(), chatConsoleCommand); commandMap.put(logoutConsoleCommand.getKey(), logoutConsoleCommand); clientCommandMenu.setAllCommand(commandMap); } public void commandThreadRunning () throws InterruptedException { Scanner scanner = new Scanner (System.in); while (true ) { clientCommandMenu.exec(scanner); String key = clientCommandMenu.getCommandInput(); BaseCommand command = commandMap.get(key); if (null == command) { System.err.println("请输入正确的指令" ); continue ; } switch (key) { case LoginConsoleCommand.KEY -> startLogin(scanner); case ChatConsoleCommand.KEY -> startOneChat(scanner); case LogoutConsoleCommand.KEY -> startLogout(); } } } private void startLogin (Scanner scanner) { if (!connectFlag) { log.info("检测到连接已断开,正在尝试重新连接..." ); startConnectServer(); synchronized (connectLock) { try { long startTime = System.currentTimeMillis(); while (!connectFlag && (System.currentTimeMillis() - startTime < 5000 )) { connectLock.wait(5000 - (System.currentTimeMillis() - startTime)); } } catch (InterruptedException e) { log.error("等待连接中断" , e); } } } if (!connectFlag) { log.info("连接异常,请重新建立连接" ); return ; } loginConsoleCommand.exec(scanner); User user = new User (); user.setUid(loginConsoleCommand.getUserName()); user.setToken(loginConsoleCommand.getPassword()); user.setDevId("123456" ); session.setUser(user); this .user = user; loginSender.setUser(user); loginSender.setSession(session); loginSender.sendLoginMsg(); } public void startConnectServer () { GenericFutureListener<ChannelFuture> listener = (ChannelFuture f) -> { synchronized (connectLock) { if (!f.isSuccess()) { log.info("连接失败!" ); connectFlag = false ; } else { log.info("Owlias-IM 服务器连接成功!" ); connectFlag = true ; channel = f.channel(); session = new ClientSession (channel); session.setConnected(true ); channel.closeFuture().addListener(closeFuture -> { synchronized (connectLock) { log.info("{}: 连接已经断开..." , new Date ()); connectFlag = false ; if (session != null ) { session.setConnected(false ); session.setLogin(false ); } } }); } connectLock.notifyAll(); } }; chatNettyClient.setConnectedListener(listener); chatNettyClient.doConnect(); } private void startOneChat (Scanner scanner) { if (!isLogin()) { log.info("startOneChat: 还没有登录,请先登录" ); return ; } chatConsoleCommand.exec(scanner); chatSender.setUser(user); chatSender.setSession(session); chatSender.sendChatMsg(chatConsoleCommand.getToUserId(), chatConsoleCommand.getMessage()); } private void startLogout () { if (!isLogin()) { log.info("未登录" ); return ; } logoutSender.setUser(user); logoutSender.setSession(session); logoutSender.sendLogoutMsg(); session.setLogin(false ); connectFlag = false ; log.info("本地会话已注销" ); } }
ChatNettyClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import com.owlias.im.client.config.SystemConfig;import com.owlias.im.client.handler.ExceptionHandler;import com.owlias.im.client.handler.LoginResponseHandler;import com.owlias.im.common.core.codec.SimpleProtobufDecoder;import com.owlias.im.common.core.codec.SimpleProtobufEncoder;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.concurrent.GenericFutureListener;import jakarta.annotation.Resource;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.context.ApplicationContext;import org.springframework.stereotype.Service;@Slf4j @Data @Service public class ChatNettyClient { @Resource private SystemConfig systemConfig; @Resource private LoginResponseHandler loginResponseHandler; @Resource private ApplicationContext applicationContext; private GenericFutureListener<ChannelFuture> connectedListener; private Bootstrap bootstrap; private EventLoopGroup g; public ChatNettyClient () { g = new NioEventLoopGroup (1 ); } public void doConnect () { if (connectedListener == null ) { log.error("未设置 connectedListener,连接取消" ); return ; } try { bootstrap = new Bootstrap () .group(g) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .remoteAddress(systemConfig.getHost(), systemConfig.getPort()) .handler(new ChannelInitializer <SocketChannel>() { public void initChannel (SocketChannel ch) { ch.pipeline().addLast("decoder" , new SimpleProtobufDecoder ()); ch.pipeline().addLast("encoder" , new SimpleProtobufEncoder ()); ch.pipeline().addLast(loginResponseHandler); ch.pipeline().addLast(applicationContext.getBean(ExceptionHandler.class)); } } ); log.info("客户端开始连接 [Owlias IM]" ); ChannelFuture f = bootstrap.connect(); f.addListener(connectedListener); } catch (Exception e) { log.info("客户端连接失败! {}" , e.getMessage()); } } public void close () { g.shutdownGracefully(); } }
session 相关 ClientSession
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Slf4j @Data public class ClientSession { public static final AttributeKey<ClientSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY" ); private Channel channel; private User user; private String sessionId; private boolean isConnected = false ; private boolean isLogin = false ; public ClientSession (Channel channel) { this .channel = channel; this .sessionId = UUID.randomUUID().toString(); channel.attr(ClientSession.SESSION_KEY).set(this ); } public static void loginSuccess (ChannelHandlerContext ctx, Msg.Message pkg) { ClientSession session = getSession(ctx); session.setSessionId(pkg.getSessionId()); session.setLogin(true ); log.info("登录成功" ); } public static ClientSession getSession (ChannelHandlerContext ctx) { Channel channel = ctx.channel(); return channel.attr(ClientSession.SESSION_KEY).get(); } public String getRemoteAddress () { return channel.remoteAddress().toString(); } public ChannelFuture writeAndFlush (Object pkg) { return channel.writeAndFlush(pkg); } public void writeAndClose (Object pkg) { ChannelFuture future = channel.writeAndFlush(pkg); future.addListener(ChannelFutureListener.CLOSE); } public void close () { isConnected = false ; ChannelFuture future = channel.close(); future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { log.error("连接顺利断开" ); } }); } }
converter 相关 BaseConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.owlias.im.client.session.ClientSession;import com.owlias.im.common.core.bean.msg.Msg;public class BaseConverter { protected Msg.HeadType type; private long seqId; private ClientSession session; public BaseConverter (Msg.HeadType type, ClientSession session) { this .type = type; this .session = session; } public Msg.Message buildOuter (long seqId) { return getOuterBuilder(seqId).buildPartial(); } public Msg.Message.Builder getOuterBuilder (long seqId) { this .seqId = seqId; return Msg.Message.newBuilder() .setType(type) .setSessionId(session.getSessionId()) .setSequence(seqId); } }
LoginMsgConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class LoginMsgConverter extends BaseConverter { private final User user; public LoginMsgConverter (User user, ClientSession session) { super (Msg.HeadType.LOGIN_REQUEST, session); this .user = user; } public Msg.Message build () { Msg.Message.Builder outerBuilder = getOuterBuilder(-1 ); Msg.LoginRequest.Builder lb = Msg.LoginRequest.newBuilder() .setDeviceId(user.getDevId()) .setPlatform(user.getPlatform().ordinal()) .setToken(user.getToken()) .setUid(user.getUid()); return outerBuilder.setLoginRequest(lb).build(); } public static Msg.Message build (User user, ClientSession session) { LoginMsgConverter converter = new LoginMsgConverter (user, session); return converter.build(); } }
HeartBeatMsgConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class HeartBeatMsgConverter extends BaseConverter { private final User user; public HeartBeatMsgConverter (User user, ClientSession session) { super (Msg.HeadType.HEART_BEAT, session); this .user = user; } public Msg.Message build () { Msg.Message.Builder outerBuilder = getOuterBuilder(-1 ); Msg.MessageHeartBeat.Builder inner = Msg.MessageHeartBeat.newBuilder() .setSeq(0 ) .setJson("{\"from\":\"client\"}" ) .setUid(user.getUid()); return outerBuilder.setHeartBeat(inner).build(); } }
ChatMsgConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ChatMsgConverter extends BaseConverter { private ChatMsg chatMsg; private User user; private ChatMsgConverter (ClientSession session) { super (Msg.HeadType.MESSAGE_REQUEST, session); } public Msg.Message build (ChatMsg chatMsg, User user) { this .chatMsg = chatMsg; this .user = user; Msg.Message.Builder outerBuilder = getOuterBuilder(-1 ); Msg.MessageRequest.Builder cb = Msg.MessageRequest.newBuilder(); this .chatMsg.fillMsg(cb); return outerBuilder.setMessageRequest(cb).build(); } public static Msg.Message build ( ChatMsg chatMsg, User user, ClientSession session) { ChatMsgConverter chatMsgConverter = new ChatMsgConverter (session); return chatMsgConverter.build(chatMsg, user); } }
command 相关 BaseCommand
1 2 3 4 5 6 7 8 9 10 11 public interface BaseCommand { String getKey () ; String getTip () ; void exec (Scanner scanner) ; }
ClientCommandMenu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Data @Service public class ClientCommandMenu implements BaseCommand { public static final String KEY = "0" ; private String allCommandsShow; private String commandInput; @Override public String getKey () { return KEY; } @Override public String getTip () { return "show 所有命令" ; } @Override public void exec (Scanner scanner) { System.err.println("请输入某个操作指令:" + allCommandsShow); commandInput = scanner.next(); } public void setAllCommand (Map<String, BaseCommand> commandMap) { Set<Map.Entry<String, BaseCommand>> entries = commandMap.entrySet(); Iterator<Map.Entry<String, BaseCommand>> iterator = entries.iterator(); StringBuilder menus = new StringBuilder (); menus.append("[menu] " ); while (iterator.hasNext()) { BaseCommand next = iterator.next().getValue(); menus.append(next.getKey()) .append("->" ) .append(next.getTip()) .append(" | " ); } allCommandsShow = menus.toString(); } }
LoginConsoleCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Data @Service public class LoginConsoleCommand implements BaseCommand { public static final String KEY = "1" ; private String userName; private String password; @Override public void exec (Scanner scanner) { System.out.println("请输入用户信息(id@password) " ); String[] info = null ; while (true ) { String input = scanner.next(); info = input.split("@" ); if (info.length != 2 ) { System.out.println("请按照格式输入(id@password):" ); } else { break ; } } userName = info[0 ]; password = info[1 ]; } @Override public String getKey () { return KEY; } @Override public String getTip () { return "登录" ; } }
ChatConsoleCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Data @Service public class implements BaseCommand { private String toUserId; private String message; public static final String KEY = "2" ; @Override public void exec (Scanner scanner) { System.out.print("请输入聊天的消息(id:message):" ); String[] info = null ; while (true ) { String input = scanner.nextLine(); info = input.split(":" ); if (info.length != 2 ) { System.out.println("请输入聊天的消息(id:message):" ); } else { break ; } } toUserId = info[0 ]; message = info[1 ]; } @Override public String getKey () { return KEY; } @Override public String getTip () { return "聊天" ; } }
LogoutConsoleCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Service public class LogoutConsoleCommand implements BaseCommand { public static final String KEY = "10" ; @Override public void exec (Scanner scanner) { Logger.cfo("退出命令执行成功" ); } @Override public String getKey () { return KEY; } @Override public String getTip () { return "退出" ; } }
sender 相关 BaseSender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Data @Slf4j public abstract class BaseSender { private User user; private ClientSession session; public boolean isConnected () { if (null == session) { log.info("isConnected: session is null" ); return false ; } return session.isConnected(); } public boolean isLogin () { if (null == session) { log.info("isLogin: session is null" ); return false ; } return session.isLogin(); } public void sendMsg (Msg.Message message) { if (null == getSession() || !isConnected()) { log.info("连接还没成功" ); return ; } Channel channel = getSession().getChannel(); ChannelFuture f = channel.writeAndFlush(message); f.addListener(future -> { if (future.isSuccess()) { sendSuccessCallback(message); } else { sendFailedCallback(message); } }); } protected void sendSuccessCallback (Msg.Message message) { log.info("发送成功" ); } protected void sendFailedCallback (Msg.Message message) { log.info("发送失败" ); } }
LoginSender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j @Service public class LoginSender extends BaseSender { public void sendLoginMsg () { if (!isConnected()) { log.info("还没有建立连接!" ); return ; } log.info("构造登录消息" ); Msg.Message message = LoginMsgConverter.build(getUser(), getSession()); log.info("发送登录消息" ); super .sendMsg(message); } }
ChatSender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j @Service public class ChatSender extends BaseSender { public void sendChatMsg (String toUid, String content) { log.info("发送消息 startConnectServer" ); ChatMsg chatMsg = new ChatMsg (getUser()); chatMsg.setContent(content); chatMsg.setMsgType(ChatMsg.MSG_TYPE.TEXT); chatMsg.setTo(toUid); chatMsg.setMsgId(System.currentTimeMillis()); Msg.Message message = ChatMsgConverter.build(chatMsg, getUser(), getSession()); super .sendMsg(message); } @Override protected void sendSuccessCallback (Msg.Message message) { log.info("发送成功:{}" , message.getMessageRequest().getContent()); } @Override protected void sendFailedCallback (Msg.Message message) { log.info("发送失败:{}" , message.getMessageRequest().getContent()); } }
LogoutSender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Slf4j @Service public class LogoutSender extends BaseSender { public void sendLogoutMsg () { if (!isLogin()) return ; Msg.Message message = Msg.Message.newBuilder() .setType(Msg.HeadType.LOGOUT_REQUEST) .setSessionId(getSession().getSessionId()) .build(); super .sendMsg(message); log.info("登出请求已发送" ); } }
handler 相关 LoginResponseHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Slf4j @ChannelHandler .Sharable@Service public class LoginResponseHandler extends ChannelInboundHandlerAdapter { @Resource private ChatMsgHandler chatMsgHandler; @Resource private HeartBeatClientHandler heartBeatClientHandler; @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg)) { super .channelRead(ctx, msg); return ; } Msg.HeadType headType = pkg.getType(); if (!headType.equals(Msg.HeadType.LOGIN_RESPONSE)) { super .channelRead(ctx, msg); return ; } Msg.LoginResponse info = pkg.getLoginResponse(); ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()]; if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) { log.info(result.getDesc()); } else { ClientSession.loginSuccess(ctx, pkg); ChannelPipeline p = ctx.pipeline(); p.addBefore(ctx.name(), "heartbeat" , heartBeatClientHandler); p.addBefore(ctx.name(), "chat" , chatMsgHandler); heartBeatClientHandler.startHeartBeat(ctx); p.remove(this ); log.info("登录成功,业务处理器已就绪" ); } } }
HeartBeatClientHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Slf4j @ChannelHandler .Sharable@Service public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter { private static final int HEARTBEAT_INTERVAL = 50 ; private boolean isStarted = false ; @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { if (!isStarted) { startHeartBeat(ctx); isStarted = true ; } } public void startHeartBeat (ChannelHandlerContext ctx) { if (!isStarted) { ClientSession session = ClientSession.getSession(ctx); Msg.Message message = new HeartBeatMsgConverter (session.getUser(), session).build(); heartBeat(ctx, message); isStarted = true ; } } public void heartBeat (ChannelHandlerContext ctx, Msg.Message heartbeatMsg) { ctx.executor().schedule(() -> { if (ctx.channel().isActive()) { if (ctx.channel().isWritable()) { log.info("发送 HEART_BEAT 消息到 server" ); ctx.writeAndFlush(heartbeatMsg); } else { log.warn("通道繁忙,跳过本次心跳发送" ); } heartBeat(ctx, heartbeatMsg); } }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg)) { super .channelRead(ctx, msg); return ; } Msg.HeadType headType = pkg.getType(); if (headType.equals(Msg.HeadType.HEART_BEAT)) { log.info("收到回写的 HEART_BEAT 消息 from server" ); return ; } else { super .channelRead(ctx, msg); } } }
ChatMsgHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @ChannelHandler .Sharable@Service public class ChatMsgHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof Msg.Message pkg)) { super .channelRead(ctx, msg); return ; } Msg.HeadType headType = pkg.getType(); if (!headType.equals(Msg.HeadType.MESSAGE_REQUEST)) { super .channelRead(ctx, msg); return ; } Msg.MessageRequest req = pkg.getMessageRequest(); String content = req.getContent(); String uid = req.getFrom(); System.out.println(" 收到消息 from uid:" + uid + " -> " + content); } }
ExceptionHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Slf4j @ChannelHandler .Sharable@Service public class ExceptionHandler extends ChannelInboundHandlerAdapter { @Resource private CommandController commandController; @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof InvalidFrameException) { log.error(cause.getMessage()); ClientSession.getSession(ctx).close(); } else { log.error(cause.getMessage()); ctx.close(); if (null == commandController) { return ; } commandController.setConnectFlag(false ); commandController.startConnectServer(); } } public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
使用 assembly 打生产包 如何配置
服务端 im-server
依赖配置:pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 <build > <finalName > im-server</finalName > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.11.0</version > <configuration > <source > ${maven.compiler.source}</source > <target > ${maven.compiler.target}</target > <annotationProcessorPaths > <path > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > </path > </annotationProcessorPaths > <compilerArgs > <arg > -parameters</arg > </compilerArgs > <parameters > true</parameters > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > ${spring-boot.version}</version > <executions > <execution > <goals > <goal > repackage</goal > </goals > </execution > </executions > <configuration > <excludeDevtools > true</excludeDevtools > <mainClass > com.owlias.im.server.ServerApp</mainClass > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.6.0</version > <configuration > <descriptors > <descriptor > src/main/assembly/assembly.xml</descriptor > </descriptors > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
日志配置:src/main/resources/lockback.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 <?xml version="1.0" encoding="UTF-8" ?> <configuration > <property name ="APP_Name" value ="im-server" /> <contextName > ${APP_Name}</contextName > <property name ="LOG_HOME" value ="${im_server_path:-./logs}" /> <property name ="LOG_PATTERN" value ="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}:%L - %msg%n" /> <appender name ="STDOUT" class ="ch.qos.logback.core.ConsoleAppender" > <encoder > <pattern > ${LOG_PATTERN}</pattern > <charset > utf8</charset > </encoder > </appender > <appender name ="FILE" class ="ch.qos.logback.core.rolling.RollingFileAppender" > <file > ${LOG_HOME}/${APP_Name}.log</file > <rollingPolicy class ="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy" > <FileNamePattern > ${LOG_HOME}/history/${APP_Name}-%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern > <maxFileSize > 100MB</maxFileSize > <MaxHistory > 30</MaxHistory > <totalSizeCap > 20GB</totalSizeCap > </rollingPolicy > <encoder > <pattern > ${LOG_PATTERN}</pattern > <charset > utf8</charset > </encoder > </appender > <appender name ="ASYNC_FILE" class ="ch.qos.logback.classic.AsyncAppender" > <discardingThreshold > 0</discardingThreshold > <queueSize > 1024</queueSize > <appender-ref ref ="FILE" /> <includeCallerData > true</includeCallerData > </appender > <logger name ="org.springframework" level ="WARN" /> <logger name ="io.netty" level ="WARN" /> <logger name ="org.hibernate" level ="WARN" /> <logger name ="com.apache.ibatis" level ="WARN" /> <logger name ="org.apache.shiro" level ="WARN" /> <logger name ="springfox.documentation" level ="WARN" /> <root level ="INFO" > <appender-ref ref ="STDOUT" /> <appender-ref ref ="ASYNC_FILE" /> </root > </configuration >
项目配置文件:src/main/resources/application.properties
1 2 3 4 chat.server.port =9000 chat.server.ip =127.0.0.1 server.web.user.url =http://localhost:8080/user spring.main.web-application-type =none
打包配置:src/main/assembly/assembly.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <assembly > <id > Alpha</id > <formats > <format > tar.gz</format > </formats > <includeBaseDirectory > true</includeBaseDirectory > <fileSets > <fileSet > <directory > src/main/assembly/bin</directory > <outputDirectory > bin</outputDirectory > <fileMode > 0755</fileMode > </fileSet > <fileSet > <directory > target</directory > <outputDirectory > lib</outputDirectory > <includes > <include > ${project.build.finalName}.jar</include > </includes > </fileSet > <fileSet > <directory > src/main/resources</directory > <outputDirectory > config</outputDirectory > <includes > <include > application.properties</include > <include > logback.xml</include > </includes > </fileSet > </fileSets > </assembly >
启动可执行文件:src/main/assembly/bin/start.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #!/bin/bash WORK_PATH=$(cd $(dirname $0 )/..; pwd ) JAR_NAME="im-client.jar" JVM="-server -Xms128m -Xmx512m" export im_client_path="${WORK_PATH} /logs" mkdir -p ${im_client_path} function start () { echo "Starting IM-Client..." java ${JVM} -Dlogging.config=${WORK_PATH} /config/logback.xml -jar ${WORK_PATH} /lib/${JAR_NAME} --spring.config.location=${WORK_PATH} /config/application.properties } function stop () { pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}' ) if [ -n "$pid " ]; then echo "Stopping IM-Client (pid: $pid )..." kill -15 "$pid " sleep 3 kill -9 "$pid " 2>/dev/null else echo "${JAR_NAME} is not running." fi } function status () { pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}' ) if [ -n "$pid " ]; then echo "${JAR_NAME} is running, PID is $pid " else echo "${JAR_NAME} is stopped" fi } case "$1 " in start) start ;; stop) stop ;; restart) stop; start ;; status) status ;; *) echo "Usage: $0 {start|stop|restart|status}" ;; esac
客户端 im-client
日志配置:src/main/resources/lockback.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 <?xml version="1.0" encoding="UTF-8" ?> <configuration > <property name ="APP_Name" value ="im-client" /> <property name ="LOG_HOME" value ="${im_server_path:-./logs}" /> <contextName > ${APP_Name}</contextName > <conversionRule conversionWord ="clr" class ="org.springframework.boot.logging.logback.ColorConverter" /> <conversionRule conversionWord ="wEx" class ="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" /> <property name ="CONSOLE_LOG_PATTERN" value ="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(LN:%L){faint} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" /> <appender name ="STDOUT" class ="ch.qos.logback.core.ConsoleAppender" > <encoder > <pattern > ${CONSOLE_LOG_PATTERN}</pattern > <charset > utf8</charset > </encoder > </appender > <appender name ="FILE" class ="ch.qos.logback.core.rolling.RollingFileAppender" > <file > ${LOG_HOME}/${APP_Name}.log</file > <rollingPolicy class ="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy" > <FileNamePattern > ${LOG_HOME}/history/${APP_Name}-%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern > <maxFileSize > 50MB</maxFileSize > <MaxHistory > 7</MaxHistory > <totalSizeCap > 1GB</totalSizeCap > </rollingPolicy > <encoder > <pattern > %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}:%L - %msg%n</pattern > <charset > utf8</charset > </encoder > </appender > <logger name ="org.springframework" level ="WARN" /> <logger name ="io.netty" level ="WARN" /> <logger name ="org.apache.http" level ="WARN" /> <root level ="INFO" > <appender-ref ref ="STDOUT" /> <appender-ref ref ="FILE" /> </root > </configuration >
打包配置:src/main/assembly/assembly.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <assembly > <id > Alpha</id > <formats > <format > tar.gz</format > </formats > <includeBaseDirectory > true</includeBaseDirectory > <fileSets > <fileSet > <directory > src/main/assembly/bin</directory > <outputDirectory > bin</outputDirectory > <fileMode > 0755</fileMode > </fileSet > <fileSet > <directory > target</directory > <outputDirectory > lib</outputDirectory > <includes > <include > ${project.build.finalName}.jar</include > </includes > </fileSet > <fileSet > <directory > src/main/resources</directory > <outputDirectory > config</outputDirectory > <includes > <include > application.properties</include > <include > logback.xml</include > </includes > </fileSet > </fileSets > </assembly >
启动可执行文件:src/main/assembly/bin/start.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #!/bin/bash WORK_PATH=$(cd $(dirname $0 )/..; pwd ) JAR_NAME="im-client.jar" JVM="-server -Xms128m -Xmx512m" export im_client_path="${WORK_PATH} /logs" mkdir -p ${im_client_path} function start () { echo "Starting IM-Client..." java ${JVM} -Dlogging.config=${WORK_PATH} /config/logback.xml -jar ${WORK_PATH} /lib/${JAR_NAME} --spring.config.location=${WORK_PATH} /config/application.properties } function stop () { pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}' ) if [ -n "$pid " ]; then echo "Stopping IM-Client (pid: $pid )..." kill -15 "$pid " sleep 3 kill -9 "$pid " 2>/dev/null else echo "${JAR_NAME} is not running." fi } function status () { pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}' ) if [ -n "$pid " ]; then echo "${JAR_NAME} is running, PID is $pid " else echo "${JAR_NAME} is stopped" fi } case "$1 " in start) start ;; stop) stop ;; restart) stop; start ;; status) status ;; *) echo "Usage: $0 {start|stop|restart|status}" ;; esac
如何使用 服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 $ tar -zxvf im-server-Alpha.tar.gz $ cd im-server $ tree . ├── bin │ └── start.sh ├── config │ ├── application.properties │ └── logback.xml ├── lib │ └── im-server.jar └── logs ├── im-server.log └── console.log $ bin/start.sh start Starting IM-Server... Service started, check logs at xxx/im-server/logs/console.log
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 $ bin/start.sh start Starting IM-Client... 2026-04-18 20:37:23.685 INFO --- [ main] com.owlias.im.client.ClientApp LN:59 : Started ClientApp in 1.156 seconds (process running for 2.001) 请输入某个操作指令:[menu] 0->show 所有命令 | 1->登录 | 2->聊天 | 10->退出 1 请输入用户信息(id @password) zhangsan@111 2026-04-18 20:42:20.080 INFO --- [ntLoopGroup-2-1] c.o.im.client.session.ClientSession LN:49 : 登录成功 2026-04-18 20:42:20.081 INFO --- [ntLoopGroup-2-1] c.o.i.c.handler.LoginResponseHandler LN:65 : 登录成功,业务处理器已就绪 请输入某个操作指令:[menu] 0->show 所有命令 | 1->登录 | 2->聊天 | 10->退出 | 2 请输入聊天的消息(id :message):请输入聊天的消息(id :message): zhangsan:hello zhangsan 收到消息 from uid:zhangsan -> hello zhangsan ...
分布式IM需要解决的核心问题 问题描述 假如服务端是由三个节点1、2、3组成的,通常情况下我们理解的 “客户端A连接节点1,连上了就一直与节点1通信,不存在A的消息发送到节点2会3的情况”,这种理解是错误的,而且分布式场景下我们也要考虑端对端的各种通信路由,比如:
物理链路断开后的“重连漂移” :这是最常见的情况。用户 A 原本连着节点 1,但突然 A 经过了一个隧道,手机信号切换(或者节点1负载过高踢掉 A)。这个时候,A的物理连接断开了,A客户端逻辑自动发起断线重连,负载均衡器(Nginx/LVS)发现节点1很忙,就把这次新的连接请求分配给了节点2。结果A就在节点2上 “安家”了。如果此时有人给 A 发消息,消息就必须去节点2找他。
发送者与接收者不在同一个节点 :这是分布式 IM 的本质场景。用户 A 连在节点 1,他的好朋友用户 B 连在节点 2,用户 B 想给用户 A 发一句“你好”。B 的消息首先物理到达节点 2,节点 2 检查本地内存,发现 A 不在自己这里,节点 2 必须把这条消息 “跨机” 送到节点 1。虽然 A 始终只跟节点 1 说话,但 A 要接收的消息,最初是出现在节点 2 上的。
多端登录与信令路由 :用户 A 同时用电脑(节点 1)和手机(节点 2)登录,有人给 A 发了一条消息,服务端必须保证 A 的两个设备都能收到。无论系统决定先处理哪端的逻辑,消息都不可避免地要在两个节点之间同步。
分布式的复杂性在于:发送者可能在任何地方。 只要用户 B 的物理连接落在了节点 2,那么他发给 A 的每一句话,第一站都是节点 2。节点 2 就像一个快递中转站,必须负责把消息发往 A 所在的 “片区”(节点 1)。这就使得我们在分布式情况下,必须考虑 “跨节点路由” 的本质原因。
业界做法 在实际工业界,针对千万级甚至亿级日活的场景(如腾讯、阿里、字节的 IM 架构),最好的解决方案通常是 “接入与逻辑彻底分离 + 全局状态网格” 。这种架构被称为 Gateway-Logic 架构 。这种架构解决了 “消息中转” 问题:
解耦 :业务员(Logic)不需要知道接线员(Gateway)的具体 IP,只需要查询 Redis 路由。
性能 :Logic 层可以根据计算需求独立扩容,而 Gateway 只负责搬运字节。
可靠性 :即使一个 Logic 节点挂了,MQ 的消息还在,其他 Logic 节点接手后依然能通过路由找到 Gateway 里的 A。
三层分离架构模型 为了解决 “重连漂移” 和 “跨机投递”,我们通常不会让 Netty 节点既当接线员又当业务员。
在分布式 IM 的世界里,地址簿(Routing Table) 就是灵魂。无论客户端漂移到哪个节点,只要地址簿是实时更新的,消息就能通过这套 “中转体系”准确送达。我们的单机版IM如果想要改造成分布式版,第一步就是将 SessionMap 从本地内存移到 Redis 中,并让服务端具备 “根据 Redis 地址转发消息” 的能力。
三层架构的具体实现 场景一:跨节点路由(A在节点1,B在节点2)—— MQ 扇出模式
最佳实践:消息队列 (MQ) + 路由索引。
具体流程:
B 发消息给 A,请求到达网关 2。
网关 2 把请求扔给逻辑层的随机一个节点。
逻辑层查 Redis 路由表,发现 UserA 在 Gateway_1。
逻辑层将消息投递到 MQ 的某个 Exchange,或者直接 RPC 调用网关 1的推送接口。
网关 1 收到指令,从内存里找出 A 的 Channel,推下去。
场景二:多端登录(电脑在节点1,手机在节点2)—— 双向 Diff 同步
最佳实践:逻辑层多点投递 + 读扩散/写扩散优化。
具体流程:
逻辑层发现 A 有两个活跃网关连接。
逻辑层并行给网关 1 和网关 2 发送推送指令。
场景三:重连漂移 —— Session 抢占与清理
最佳实践:分布式 Session 锁。
具体流程:
A 连接到网关 2。
网关 2 告诉逻辑层:“A 在我这登录了”。
逻辑层更新 Redis 路由为 “A -> Gateway_2”,并发现 A 之前在 “Gateway_1”。
逻辑层向 Gateway_1 发起 “踢人” 指令。Gateway_1 收到后断开旧连接。
另外,为了保证在复杂的网络环境下消息的可靠性,我们主动引入 SeqId 机制 。SeqId 是一个用户维度单调递增的数字,通常是64位长整型。服务端会在数据库里记录发送给 A 的当前最大 ID 是多少,比如 max_seq = 100。要理解序列号ID,我们得先抛弃 “服务端直接把消息推给客户端就完事了” 的简单想法。在不稳定的移动网络下,单靠服务端推送(Push)是无法保证消息不丢(网络闪断没收到)和不乱(后发的先到了)的。
情景一 :假设用户 B 连续给 A 发了两条消息:“你好”(Seq: 101)、“在吗?”(Seq: 102)。由于网络路径不同,Seq 102 可能会比 Seq 101先到达 A 的手机。如果没有 Seq ID,A 看到的对话顺序会变成 “在吗?— 你好”,逻辑全乱了。有了 Seq ID,客户端 A 收到 102 时,发现本地最大 ID 还是 100,它知道中间漏了 101。此时客户端可以暂时不显示 102,或者在界面上按 ID 重新排序。
情景二 :当客户端 A 收到 Seq 为 105 的消息,但本地最后一条消息是 102 时,客户端立刻意识到:中间的 103 和 104 丢了!此时客户端会自动向服务端发起 PullRequest(start=103, end=104) 请求,把丢失的消息补回来。
情景三 :当用户 A 经过隧道重新连上节点 2 时:客户端A上报告诉服务器:“我本地最后一条消息 ID 是 105”。服务器查数据库发现:“发给 A 的最大 ID 已经是 110 了”。服务器不再等待 A 说话,直接把 106 到 110 的消息打包推给 A。
技术选型建议
实际完整架构 实际的架构通常如下:
接入层(Netty 集群) :开发者通常手动编写基于 Netty 的 Spring Boot 程序。每个节点会暴露两个接口:
面向用户:TCP/WebSocket 端口(如 8081),维持长连接。
面向内部:RPC 端口(如 Dubbo/gRPC),让逻辑层能找回来。
逻辑层(Spring Cloud 微服务集群) :使用 Spring Cloud 体系(Nacos, Sentinel, OpenFeign)。
当一个消息进来,接入层通过 Spring Cloud 的负载均衡算法,随机挑选一个逻辑节点处理业务。
状态网格(Redis + MQ) :这是连接两层的纽带。
Redis:存储 “UID -> Gateway_IP”。逻辑层处理完业务,查 Redis 知道该发往哪个网关。
RocketMQ/RabbitMQ:逻辑层将推送指令发给 MQ,各个网关节点订阅自己的队列。网关层和逻辑层异步解耦。
为什么不直接用 Spring Cloud Gateway 做接入层呢?虽然 Spring Cloud Gateway 也是基于 Netty 的(Reactor Netty),但 IM 系统有几个特殊需求:
私有协议支持:我们需要处理自定义的 Protobuf。
内存占用极致优化:接入层节点需要承载百万级连接,应手动控制 Netty 的 ByteBuf 申请和释放(池化)。
心跳策略定制:IM 的心跳往往带有业务属性(如多端同步位点),需要深度控制 Pipeline。
标题:
Java NIO - 基于 Netty 单体 IM 系统项目实践